{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a8916e1b-6de5-4096-964d-87e2f7b5b746",
   "metadata": {
    "deletable": false,
    "editable": true,
    "id": "a8916e1b-6de5-4096-964d-87e2f7b5b746",
    "trusted": true
   },
   "outputs": [],
   "source": [
    "%session_id_prefix native-hudi-sql-\n",
    "%glue_version 3.0\n",
    "%idle_timeout 60\n",
    "%%configure \n",
    "{\n",
    "  \"--conf\": \"spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension\",\n",
    "  \"--datalake-formats\": \"hudi\"\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "74b7200d-2175-4f2f-b7ff-569ce57fd192",
   "metadata": {
    "id": "74b7200d-2175-4f2f-b7ff-569ce57fd192",
    "trusted": true
   },
   "outputs": [],
   "source": [
    "bucket_name = \"<Your S3 bucket name>\"\n",
    "bucket_prefix = \"<Your S3 bucket prefix>\"\n",
    "database_name = \"hudi_sql\"\n",
    "table_name = \"product_cow\"\n",
    "table_prefix = f\"{bucket_prefix}/{database_name}/{table_name}\"\n",
    "table_location = f\"s3://{bucket_name}/{table_prefix}\""
   ]
  },
  {
   "cell_type": "markdown",
   "id": "7e7eab99-9d36-4b5b-8eb0-d7b935351750",
   "metadata": {
    "id": "7e7eab99-9d36-4b5b-8eb0-d7b935351750"
   },
   "source": [
    "## Clean up existing resources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "62dc44d1-4c48-4f24-bfce-60637972914b",
   "metadata": {
    "id": "62dc44d1-4c48-4f24-bfce-60637972914b",
    "trusted": true
   },
   "outputs": [],
   "source": [
    "import boto3\n",
    "\n",
    "## Create a database with the name hudi_sql to host hudi tables if not exists.\n",
    "try:\n",
    "    glue = boto3.client('glue')\n",
    "    glue.create_database(DatabaseInput={'Name': database_name})\n",
    "except glue.exceptions.AlreadyExistsException:\n",
    "    print(f\"{database_name} already exist\")\n",
    "\n",
    "## Delete files in S3\n",
    "s3 = boto3.resource('s3')\n",
    "bucket = s3.Bucket(bucket_name)\n",
    "bucket.objects.filter(Prefix=f\"{table_prefix}/\").delete()\n",
    "\n",
    "## Drop table in Glue Data Catalog\n",
    "try:\n",
    "    glue = boto3.client('glue')\n",
    "    glue.delete_table(DatabaseName=database_name, Name=table_name)\n",
    "except glue.exceptions.EntityNotFoundException:\n",
    "    print(f\"{database_name}.{table_name} does not exist\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "08706080-9af8-4721-bdfa-0872e0407c68",
   "metadata": {
    "id": "08706080-9af8-4721-bdfa-0872e0407c68"
   },
   "source": [
    "## Create Hudi table with sample data using catalog sync"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1d241e37-0ab5-4e1d-9ec1-fd428bc865e8",
   "metadata": {
    "id": "1d241e37-0ab5-4e1d-9ec1-fd428bc865e8",
    "outputId": "a5d61103-8120-4869-ca80-e42f8f282055",
    "trusted": true
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import Row\n",
    "import time\n",
    "\n",
    "ut = time.time()\n",
    "\n",
    "product = [\n",
    "    {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}\n",
    "]\n",
    "\n",
    "df_products = spark.createDataFrame(Row(**x) for x in product)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a4c76d90-7137-40ec-9577-837e4716404b",
   "metadata": {
    "id": "a4c76d90-7137-40ec-9577-837e4716404b",
    "outputId": "fc98feed-2b27-4ee5-cf80-5f948863d44c",
    "trusted": true
   },
   "outputs": [],
   "source": [
    "df_products.createOrReplaceTempView(\"tmp_product_cow\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e639391d-6195-4f99-83ee-87d0a5c21e61",
   "metadata": {
    "id": "e639391d-6195-4f99-83ee-87d0a5c21e61"
   },
   "source": [
    "The following query create an external hudi table with the configuration specified in the options. For more information, check https://hudi.apache.org/docs/table_management/#create-table-for-an-external-hudi-table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4b53f9a9-fe7a-456f-aca9-7d5d80ab1b01",
   "metadata": {
    "id": "4b53f9a9-fe7a-456f-aca9-7d5d80ab1b01",
    "trusted": true
   },
   "outputs": [],
   "source": [
    "query = f\"\"\"\n",
    "create table if not exists {database_name}.{table_name}  using hudi\n",
    "options (\n",
    "    type = 'cow',\n",
    "    primaryKey = 'product_id',\n",
    "    preCombineField = 'updated_at',\n",
    "    path = '{table_location}',\n",
    "    hoodie.table.name = '{table_name}',\n",
    "    hoodie.datasource.write.operation = 'upsert',\n",
    "    hoodie.datasource.hive_sync.enable = 'true',\n",
    "    hoodie.datasource.hive_sync.database = '{database_name}',\n",
    "    hoodie.datasource.hive_sync.table = '{table_name}',\n",
    "    hoodie.datasource.hive_sync.partition_fields = 'category',\n",
    "    hoodie.datasource.hive_sync.partition_extractor_class = 'org.apache.hudi.hive.MultiPartKeysValueExtractor',\n",
    "    hoodie.datasource.hive_sync.use_jdbc = 'false',\n",
    "    hoodie.datasource.write.hive_style_partitioning = 'true'\n",
    ")\n",
    "partitioned by (category)\n",
    "AS SELECT * FROM tmp_product_cow\n",
    "\"\"\"\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "fda19a10-4b69-4272-99a2-1b1156e937c2",
   "metadata": {
    "id": "fda19a10-4b69-4272-99a2-1b1156e937c2"
   },
   "source": [
    "## Read from Hudi table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "777215c7-272c-44a8-84e3-e799c0e7852f",
   "metadata": {
    "id": "777215c7-272c-44a8-84e3-e799c0e7852f",
    "outputId": "a3acaf5e-2ad2-4193-bcbe-e68e64879624",
    "trusted": true
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM hudi_sql.product_cow"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c013d49c-da63-4910-b423-4ebd0e346e1f",
   "metadata": {
    "id": "c013d49c-da63-4910-b423-4ebd0e346e1f"
   },
   "source": [
    "## Upsert records into Hudi table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "97c52ecd-ac33-4178-b41d-ede0db0b1c97",
   "metadata": {
    "id": "97c52ecd-ac33-4178-b41d-ede0db0b1c97",
    "outputId": "57a02e9f-9c41-4009-c7eb-c626ec6442d1",
    "trusted": true
   },
   "outputs": [],
   "source": [
    "ut = time.time()\n",
    "\n",
    "product_updates = [\n",
    "    {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'updated_at': ut, 'category': 'Electronics'}, # Update\n",
    "    {'product_id': '00006', 'product_name': 'Desk', 'price': 50, 'updated_at': ut, 'category': 'Furniture'} # Insert\n",
    "]\n",
    "df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "77e3e8f8-8dce-4c92-97e3-a5ffc942950b",
   "metadata": {
    "id": "77e3e8f8-8dce-4c92-97e3-a5ffc942950b",
    "outputId": "6cb0dd20-281a-424b-de06-8f38c099260e",
    "trusted": true
   },
   "outputs": [],
   "source": [
    "df_product_updates.createOrReplaceTempView(\"tmp_product_cow_updates\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a59c8932-d1c7-493c-b800-bff9d204c7ef",
   "metadata": {
    "id": "a59c8932-d1c7-493c-b800-bff9d204c7ef",
    "trusted": true
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "INSERT OVERWRITE hudi_sql.product_cow SELECT * FROM tmp_product_cow_updates"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "dd563164-fa7d-4166-b171-46d7a1a623f8",
   "metadata": {
    "id": "dd563164-fa7d-4166-b171-46d7a1a623f8",
    "outputId": "b8edcea0-910e-4648-ca37-e1feb3d444d3",
    "trusted": true
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM hudi_sql.product_cow"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "66730a3a-2242-4575-b431-3f670178de6d",
   "metadata": {
    "id": "66730a3a-2242-4575-b431-3f670178de6d",
    "trusted": true
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "UPDATE hudi_sql.product_cow  SET price =price * 1.2, updated_at = unix_timestamp(current_timestamp())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6ed8df6c-b8b2-42fc-8d5a-cdaeba62e7b2",
   "metadata": {
    "id": "6ed8df6c-b8b2-42fc-8d5a-cdaeba62e7b2",
    "outputId": "70ce1a6a-4d17-462a-f22e-1cf8bd8a95f4",
    "trusted": true
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM hudi_sql.product_cow"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c200714e",
   "metadata": {},
   "source": [
    "## Stop Session"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fbe71f6b-6cec-48be-9e00-28ceae60026f",
   "metadata": {
    "id": "fbe71f6b-6cec-48be-9e00-28ceae60026f",
    "outputId": "5b210a0a-32c5-4567-8c21-551d337e6e77",
    "trusted": true
   },
   "outputs": [],
   "source": [
    "%stop_session"
   ]
  }
 ],
 "metadata": {
  "colab": {
   "collapsed_sections": [],
   "name": "hudi_sql.ipynb",
   "provenance": []
  },
  "kernelspec": {
   "display_name": "Glue PySpark",
   "language": "python",
   "name": "glue_pyspark"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
